Redshift に認証失敗したリモートホストを検出して通知したい
今回は Redshift に一定回数、認証失敗した接続元を検出し、通知する仕組みを実装する機会があったので紹介します。
検出条件は 「30 分間に 10 回以上、認証に失敗した通信元の検出」とします。
Redshift のログ
Redshift には 3 つのログオプションがあります。
- 監査ログ
- STL テーブル
- AWS CloudTrail
今回のようにログインなどのアクティビティを調査する場合、「監査ログ」か「STL テーブル」を使用することになるかと思います。
それぞれの違いについては、以下 AWS 公式の表が解りやすくまとまっているので参照ください。
今回は直近 30 分のログを調査できれば良いので STL テーブルを利用することにしました。
長期間にわたるログであったり、負荷が高くなるようなクエリであれば Athena を使って監査ログから調べるなど、用途に応じて「監査ログ」「STL テーブル」を使い分けていただければ良いかと思います。
構成
今回の構成は以下のようにしています。
パブリック接続可能な Redshift ですが IP 制限を行っているため、VPC Lambda としています。SNS エンドポイントには NAT Gateway を抜けてアクセスします。あとは 30 分ごとに実行するために CloudWatch Events を利用します。
ネットワークまわりは各環境にあわせて変更していただければ良いかと思います。
実装内容
事前準備
以下の内容は本筋からそれるので割愛します。
- NAT Gateway の設置
- SNS トピックおよびサブスクリプションの設定
- CloudWatch Events の設定
- パラメータストアの設定
- 構成図に描き忘れていましたが、ユーザID、パスワードは AWS Systems Manager のパラメータストアに格納しています
Lambda Layer の作成
今回は Python(boto3) で実装していますが、pandas
と psycopg2
のモジュールを使っています。これらを Lambda Layer に登録しておきます。
pandas
pip
コマンドでローカル環境にインストールした pandas
を zip 化して Lambda Layer に登録します。
$ mkdir python $ pip install -t ./python pandas $ zip -r pandas.zip python
psycopg2
psycopg2
モジュールは pandas
のように pip install
で作成しても、動的リンク先の libpq
が Lambda の実行環境に含まれていないため、以下のようなエラーになり動作しません。
Unable to import module 'lambda_function': libpq.so.5: cannot open shared object file: No such file or directory
詳細は irbbb の記事を参照ください。
記事のように自分でコンパイルしても良いのですが、今回はGithub で公開されていたコチラのコンパイル済みのパッケージを利用させていただきました。今回は Python3.7 で準備していたのでリンク先より psycopg2-3.7
ディレクトリを取得します。
/python
ディレクトリ化に展開した awslambda-psycopg2
ディレクトリを psycopg2
にリネームして zip 化します。
$ tree . └── python └── psycopg2 ├── __init__.py ├── _ipaddress.py ├── _json.py ├── _lru_cache.py ├── _psycopg.cpython-37m-x86_64-linux-gnu.so ├── _range.py ├── compat.py ├── errorcodes.py ├── errors.py ├── extensions.py ├── extras.py ├── pool.py ├── psycopg1.py ├── sql.py └── tz.py $ zip -r psycopg2.zip python
zip 化した 2 つのモジュールを Lambda Layer に登録します。
Lambda 関数の作成
Lambda 関数は以下のように作成しました。ロールは検証のため AdministratorAccess ポリシーをアタッチしていますが、必要に応じて権限は絞り込んでください。
項目 | 値 |
---|---|
ランタイム | Python 3.7 |
ロール | AdministratorAccess ポリシーをアタッチ |
VPC | Custom VPC |
Lambda Layer | pandas と psycopg2 を利用 |
環境変数
幾つかの環境変数を設定しています。
環境変数名 | 説明 |
---|---|
dbname | 接続するデータベース名を指定 |
endpoint | Redshift クラスターのエンドポイントを指定 |
port | Redshift クラスターの接続ポートを指定 |
user_ps | ユーザ名用のパラメータストア名を指定 |
password_ps | パスワード用のパラメータストア名を指定 |
threshold | しきい値を指定(今回は 10 にしました) |
sns_topic_arn | 通知先の SNS トピック ARN を指定 |
env | メールの Subject に含める任意の環境名を指定 |
コード
本題のコードは以下のとおりです。
import boto3 import os import logging import psycopg2 import pandas as pd logger = logging.getLogger() logger.setLevel(logging.INFO) endpoint = os.environ['endpoint'] port = os.environ['port'] dbname = os.environ['dbname'] user_ps = os.environ['user_ps'] password_ps = os.environ['password_ps'] ssm = boto3.client('ssm') sns = boto3.client('sns') def lambda_handler(event, context): # SQL 文 # 実行時点〜30分前の間の authentication failure になった通信元をカウント sql = """ SELECT remotehost, count(*) FROM stl_connection_log WHERE event = 'authentication failure' AND recordtime BETWEEN (SYSDATE + INTERVAL '-30 minute') and SYSDATE GROUP BY remotehost; """ try: df = sql_query(sql) except Exception as e: logger.error(e) raise e msg = [] for index, row in df.iterrows(): threshold = os.environ['threshold'] if row['count'] >= int(threshold): logging.warning('remote host %s was exceeded threshold of authentication failure [%i times]' % (row['remotehost'], row['count'])) m = ('%s [%i times]' % (row['remotehost'], row['count'])) msg.append(m) if msg: sns_send_message(msg) # 接続情報 def get_connection(): # ユーザ名の取得 user = ssm.get_parameter( Name = user_ps, WithDecryption = True )['Parameter']['Value'] # パスワードの取得 password = ssm.get_parameter( Name = password_ps, WithDecryption = True )['Parameter']['Value'] dsn = { "host" : endpoint, "port" : port, "database" : dbname, "user" : user, "password" : password, } con = psycopg2.connect(**dsn) return con # SQL の実行 def sql_query(sql): with get_connection() as conn: return pd.read_sql(sql=sql, con=conn) # SNS トピックに送信 def sns_send_message(msg): env = os.environ['env'] topic = os.environ['sns_topic_arn'] sendmsg = sorted(msg) sendmsg.insert(0, 'The following IP address was exceeded threshold of authentication failure.\nPlease check stl_connection_log\n') subject = ('[ALERT]:(%s) Redshift Cluster authentication failure detection.' % env) try: response = sns.publish( TopicArn = topic, Message = "\n".join(sendmsg), Subject = subject ) except Exception as e: logger.error(e) raise e
今回は通信元の remotehost
フィールドのみで集計していますが、要件によってデータベース名やユーザ名など複数フィールドでグループ化してください。
psycopg2
を使って pandas.read_sql()
で読み込む方法については、以下の記事を参考にしましたのであわせてお読みください。
確認
上記のとおり Lambda が準備できたら、試しに該当の Redshift クラスターに対してログイン失敗を 10 回ほど行った後、Lambda のテストを実行すると以下のように接続失敗した通信元の IP アドレスと 30 分間に失敗したカウント数が通知されます。
あとは、この Lambda を CloudWatch Event で 30 分ごとに実行するように仕込めば完成です。
さいごに
今回は Redshift の STL テーブルを使って、ログイン失敗のしきい値を超えた通信元を通知する仕組みを紹介しました。
Redshift のセキュリティ監視要件等で必要があれば参考にしていただければ幸いです。
以上!大阪オフィスの丸毛(@marumo1981)でした!